/*
 * Copyright (C) 2017 Intel Corporation.  All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


/*
 * message_queue.c
 *
 *  Created on: Oct 25, 2016
 *      Author: xwang98
 */

//system
#include <sys/time.h>

#include "agent_core_lib.h"


struct _msg_t
{
    struct _msg_t * next;
    struct _msg_t * prev;
    msg_tag_t tag;
    uint32_t len;   // none zero if body is a buffer
    void * body;
    time_t  time;
    bh_msg_handler msg_handler;
    bh_msg_cleanup_f body_clean_handler;
};


struct _msg_queue_t
{
    pthread_mutex_t condition_mutex;
    pthread_cond_t  condition_cond;
    pthread_condattr_t cond_attr;
    unsigned int cnt;
    unsigned int max;
    uint32_t drops;
    msg_t head;
    msg_t tail;
};

inline uint32_t get_msg_queue_drops(msg_queue_t queue)
{
    return queue->drops;
}


inline msg_tag_t get_msg_tag(msg_t msg)
{
    return msg->tag;
}

inline void * get_msg_body(msg_t msg)
{
    return msg->body;
}

inline uint32_t get_msg_buffer_len(msg_t msg)
{
    return msg->len;
}


#define DEFAULT_QUEUE_MAX 100

static void body_handler_buffer(void * body)
{
    free(body);
}




msg_queue_t create_queue2(unsigned int max_queue_size)

{
    msg_queue_t queue = (msg_queue_t) malloc(sizeof(struct _msg_queue_t));
    if(queue == NULL)
        return NULL;

    memset((void*)queue, 0, sizeof(*queue));

    queue->max = max_queue_size;
    errno = pthread_condattr_init (&queue->cond_attr);
    if (errno) {
        perror ("pthread_condattr_init");
        return NULL;
    }

    errno = pthread_condattr_setclock (&queue->cond_attr, CLOCK_MONOTONIC);
    if (errno) {
        perror ("pthread_condattr_setclock");
    }
    pthread_mutex_init(&queue->condition_mutex, NULL); //PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_init(&queue->condition_cond, &queue->cond_attr); // = PTHREAD_COND_INITIALIZER;

    return queue;
}

msg_queue_t create_queue()
{
    return create_queue2(DEFAULT_QUEUE_MAX);
}

void release_queue(msg_queue_t  queue)
{
    pthread_mutex_destroy(&queue->condition_mutex);
    pthread_cond_destroy(&queue->condition_cond);
    pthread_condattr_destroy(&queue->cond_attr);

    free(queue);
}

bool post_msg(msg_queue_t queue, msg_t msg)
{
    if(queue->cnt >= queue->max)
    {
        queue->drops ++;
        free_msg(msg);  // free the msg automatically if failed.
        return false;
    }

    pthread_mutex_lock(&queue->condition_mutex);

    if (queue->cnt == 0)
    {
        assert (queue->head == NULL);
        assert (queue->tail == NULL);
        queue->head = queue->tail = msg;
        msg->next = msg->prev = NULL;
        queue->cnt = 1;

        pthread_cond_signal( &queue->condition_cond );
    }
    else
    {
        msg->next = NULL;
        msg->prev = queue->tail;
        queue->tail->next = msg;
        queue->tail = msg;
        queue->cnt  ++;
    }

    pthread_mutex_unlock( &queue->condition_mutex );

    return true;
}



bool post_msg_buffer(msg_queue_t queue, msg_tag_t tag, char **buffer, uint32_t len)
{
    msg_t msg = new_msg((void**)buffer, tag, body_handler_buffer);
    if(msg == NULL)
    {
        body_handler_buffer(*buffer);
        *buffer = NULL;
        return false;
    }
    msg->len = len;
    return post_msg(queue, msg);
}


bool post_msg_dup(msg_queue_t queue, msg_tag_t tag, char * body, unsigned int len)
{
    if(len == 0) 
        return false;
    char * body_alloc = (char*)malloc(len);
    if(body_alloc == NULL) 
        return false;
    memcpy(body_alloc, body, len);
    return post_msg_buffer(queue, tag, &body_alloc, len);
}

msg_t  new_msg( void **body, msg_tag_t tag, bh_msg_cleanup_f body_cleaner)
{
    msg_t msg = (msg_t) malloc_z(sizeof(struct _msg_t));
    if(msg == NULL)
        return NULL;
    memset(msg,0, sizeof(sizeof(struct _msg_t)));
    msg->len = 0;
    msg->body = *body;
    *body = NULL;   // take over the body
    msg->time = time(NULL);
    msg->tag = tag;
    msg->body_clean_handler = body_cleaner;

    return msg;
}

void free_msg(msg_t msg)
{
    if(msg->body_clean_handler) msg->body_clean_handler(msg->body);
    free(msg);
}

msg_t get_msg_timeout_us(msg_queue_t queue, uint64_t timeout_us)
{
    msg_t msg = NULL;
    pthread_mutex_lock(&queue->condition_mutex);

    struct timespec timeToWait;

    if( queue->cnt == 0)
    {
        assert (queue->head == NULL);
        assert (queue->tail == NULL);

        if(timeout_us == 0)
        {
            pthread_mutex_unlock(&queue->condition_mutex);
            return NULL;
        }


        clock_gettime (CLOCK_MONOTONIC, &timeToWait);
        timespec_add_us(&timeToWait, timeout_us);

        pthread_cond_timedwait(&queue->condition_cond,
                &queue->condition_mutex,
                &timeToWait);
    }

    if (queue->cnt == 0)
    {
        assert (queue->head == NULL);
        assert (queue->tail == NULL);
    }
    else if (queue->cnt == 1)
    {
        assert (queue->head == queue->tail);

        msg = queue->head;
        queue->head = queue->tail = NULL;
        queue->cnt = 0;
    }
    else
    {
        msg = queue->head;
        queue->head = queue->head->next;
        queue->head->prev = NULL;
        queue->cnt--;
    }

    pthread_mutex_unlock(&queue->condition_mutex);

    return msg;
}

msg_t get_msg_timeout_ms(msg_queue_t queue, int timeout_ms)
{
    return get_msg_timeout_us(queue, timeout_ms* 1000);

}

msg_t get_msg(msg_queue_t queue, int timeout_secs)
{

    return get_msg_timeout_us(queue, timeout_secs * 1000* 1000);
}

// run the msg handler if it is available in the message
msg_t get_msg_call_handler(msg_queue_t queue, int timeout)
{
    msg_t  msg = NULL;
    tick_time_t now;
    int remaining_ms = timeout * 1000;
    tick_time_t last = bh_get_tick_ms();
    while(timeout >=0 && (msg = get_msg_timeout_ms(queue, remaining_ms)) && msg->msg_handler)
    {
        msg->msg_handler(msg);
        free_msg(msg);
        msg = NULL;

        now = bh_get_tick_ms();
        remaining_ms -= (now - last);
        last = now;
    }

    return msg;
}
